package com.lntea.netty.mqtt.heartBeat;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;

/**
 * Title: MqttHeartBeatBrokenHandler.java<br>
 * Description:  <br>
 * Copyright: Copyright (c) 2015<br>
 * Company: 北京云杉世界信息技术有限公司<br>
 *
 * @author lichao
 * @date 2019/11/19 11:25
 */
@ChannelHandler.Sharable
public class MqttHeartBeatBrokenHandler extends ChannelInboundHandlerAdapter{

    public static final MqttHeartBeatBrokenHandler INSTANCE = new MqttHeartBeatBrokenHandler();

    private MqttHeartBeatBrokenHandler() {}

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MqttMessage mqttMessage = (MqttMessage) msg;
        System.out.println("Received MQTT message: " + mqttMessage);

        switch (mqttMessage.fixedHeader().messageType()) {
            case CONNECT:
                // 构建mqtt固定头
                MqttFixedHeader connackFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK,
                        false, MqttQoS.AT_MOST_ONCE, false, 0);
                // 构建mqtt可变头
                MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(
                        MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
                // 组装mqtt连接返回信息
                MqttConnAckMessage connAckMessage = new MqttConnAckMessage(connackFixedHeader, mqttConnAckVariableHeader);
                ctx.writeAndFlush(connAckMessage);
                break;
            case PINGREQ:
                MqttFixedHeader pingreqFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP,
                        false, MqttQoS.AT_MOST_ONCE, false, 0);
                MqttMessage pingResp = new MqttMessage(pingreqFixedHeader);
                ctx.writeAndFlush(pingResp);
                break;
            case DISCONNECT:
                ctx.close();
                break;
            default:
                System.out.println("Unexpected message type : " + mqttMessage.fixedHeader().messageType());
                ReferenceCountUtil.release(msg);
                ctx.close();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent && IdleState.READER_IDLE == ((IdleStateEvent)evt).state()) {
            System.out.println("Channel heart beat lost");
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
